package org.databandtech.mockmq;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.databandtech.mockmq.entity.EpgVod;
import com.google.gson.Gson;

/*
 * java -jar mockmq.jar localhost:9092 Hello-Kafka 10

参数1：host
参数2：topic_name
参数3：创建的数据量
 */
public class EpgVodKafkaProducer {

	static String HOST="192.168.13.52:9092";//"192.168.10.60:9092"
	static String TOPIC="EPGVOD";
	static int COUNT=10;    //发送的数据条数
	final static int PARTITION=0; //分区
	//对电信、移动多次重复是增加随机选择的命中率
	final static String[] SYS = {"t","t","u","m","m","m"};//电信、联通、移动
	final static String[] STBTYPE = {"huawei","mi","mi","mi","oppo","oppo"};
	final static String[] TERMINALTYPE = {"linux_STB","android_STB","PCClient","Ipad","iphone"};
	final static String	MEDIACODE_PRRFIX ="vid-"; //最终的vid是“vid-1”到“vid-100”，便于统计
	final static String[] DEFINITION = {"0","1","2","3"};
	final static String[] REFER_PAGE_ID = {"home","movie","movie","movie","child","child","series"};
	final static String[] ACTION_TYPE = {"browsing","timeshift","tvod_playing","tvod_playing","tv_playing","tv_playing","tv_playing","vod_playing","vod_playing"};
	final static String[] AREA_CODE = {"北京","北京","北京","上海","上海","上海","广州","广州","深圳","深圳","重庆","杭州","武汉","南京","郑州","西安","成都","长沙"};
	final static String[] APPS = {"腾讯","爱奇艺","优酷"};
	
	public static void main(String[] args) {
		
		//madeTenEntity();
		
		if (args[0]==null || args[1]==null ) {
			System.out.println("请定义参数，例如EpgVodKafkaProducer 192.168.13.52:9092 topic_name 100");
		}
		
		if (args[0].isEmpty() || args[1].isEmpty() || args[2].isEmpty()) {
			System.out.println("请定义参数，例如EpgVodKafkaProducer 192.168.13.52:9092 topic_name 100");			
		}

		HOST = args[0];
		TOPIC = args[1];
		COUNT = Integer.parseInt(args[2]);

		processProducer();
	}
	
	private static void madeTenEntity() {
		long starttime = System.currentTimeMillis();
		Gson gson = new Gson();
		for (int i = 1; i <= 10; i++) {
			EpgVod vod = new EpgVod();
			madeEntity(vod);
			String jsonStr = gson.toJson(vod);
			System.out.println(jsonStr);    
		}
		long endtime = System.currentTimeMillis();
		long duration = endtime - starttime;
		System.out.println("完成。总计时长： "+duration);
	}

	private static void processProducer() {
		Properties properties = new Properties();
		properties.put("bootstrap.servers", HOST);
		// 0:producer不会等待broker发送ack
        // 1:当leader接收到消息后发送ack
        // -1:当所有的follower都同步消息成功后发送ack
		properties.put("acks", "-1");
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		System.out.println("kafka连接成功，开始生成数据 ");
		@SuppressWarnings("resource")
		org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
		// -- 同步发送消息
		long starttime = System.currentTimeMillis();
		Gson gson = new Gson();
		for (int i = 1; i <= COUNT; i++) {
			RecordMetadata metadata = null;
		    //参数1：topic名, 参数2：消息文本； ProducerRecord多个重载的构造方法
			EpgVod vod = new EpgVod();
			madeEntity(vod);
			//指定分区发送
			//ProducerRecord<String, String> pr = new ProducerRecord<String, String>(TOPIC,PARTITION,"key"+i, msg +"--"+i);
			//默认分区发送
			String jsonStr = gson.toJson(vod);
			ProducerRecord<String, String> pr = new ProducerRecord<String, String>(TOPIC, jsonStr);
			if (i==COUNT) {
				try {
					metadata = kafkaProducer.send(pr).get();
					System.out.println(jsonStr);
					System.out.println("TopicName : " + metadata.topic() + " Partiton : " + metadata
		                    .partition() + " Offset : " + metadata.offset()+"--本批次第："+i);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (ExecutionException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			else {
			
				kafkaProducer.send(pr);
			}
//			try {
//				//metadata = kafkaProducer.send(pr).get();
//				//System.out.println(jsonStr);
//				//System.out.println("TopicName : " + metadata.topic() + " Partiton : " + metadata
//	            //        .partition() + " Offset : " + metadata.offset()+"--"+jsonStr+i);
//			} catch (InterruptedException e) {
//				// TODO Auto-generated catch block
//				e.printStackTrace();
//			} catch (ExecutionException e) {
//				// TODO Auto-generated catch block
//				e.printStackTrace();
//			}		    
		}
		long endtime = System.currentTimeMillis();
		long duration = endtime - starttime;
		System.out.println("完成。总计时长： "+duration);
	}

	private static void madeEntity(EpgVod vod) {
		vod.setAction_type(ACTION_TYPE[Mock.getNum(0, ACTION_TYPE.length-1)] );
		vod.setArea_code(AREA_CODE[Mock.getNum(0, AREA_CODE.length-1)]);
		vod.setBitrate("2k");
		vod.setCurrentplaytime("");
		vod.setDefinition(DEFINITION[Mock.getNum(0, DEFINITION.length-1)]);
		vod.setEpg_group_id(DEFINITION[Mock.getNum(0, DEFINITION.length-1)]);
		vod.setLog_time(System.currentTimeMillis()+"");
		vod.setMediacode(MEDIACODE_PRRFIX+Mock.getNumString(200));
		vod.setRefer_page_id(REFER_PAGE_ID[Mock.getNum(0, REFER_PAGE_ID.length-1)]);
		vod.setRefer_type(DEFINITION[Mock.getNum(0, DEFINITION.length-1)]);
		vod.setStart_time("");
		vod.setStb_id("");
		vod.setStb_ip("192.168.1." + Mock.getNumString(200));

		vod.setUser_group_id(DEFINITION[Mock.getNum(0, DEFINITION.length-1)]);
		vod.setTerminal_type(TERMINALTYPE[Mock.getNum(0, TERMINALTYPE.length-1)]);
		
		String sysid = SYS[Mock.getNum(0, SYS.length-1)];
		vod.setSys_id(sysid); 
		vod.setUser_id("uid-" + sysid + Mock.getNumString(10000000));
		
		vod.setStb_type(STBTYPE[Mock.getNum(0, STBTYPE.length-1)]);
		vod.setStb_mac("");
	}

}
